Skip to content

Conversation

@knoepfel
Copy link
Member

Issue #24 describes the need to adjust the caching of data products. This PR includes an exploration of using repeater nodes.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Explores introducing a “repeater” concept for multi-layer consumption by refactoring how flush/caching signals are delivered through the flow graph, and adds a dedicated test harness to exercise the approach.

Changes:

  • Added a new test/repeater test suite with an index_router and repeater_node prototype to explore cached product reuse across layers.
  • Refactored flush propagation: flush messages are no longer routed through the multiplexer; they are broadcast via dedicated flusher nodes and consumed through new flush_port() receivers on core node types.
  • Tightened several tests from “>=” to strict “==” execution-count expectations and exposed layer_generator::layer_paths() for routing.

Reviewed changes

Copilot reviewed 27 out of 27 changed files in this pull request and generated 12 comments.

Show a summary per file
File Description
test/repeater/repeater_node.hpp Adds prototype repeater composite node with caching/flush-driven replay behavior.
test/repeater/repeater.cpp Adds Catch2 test constructing a multi-layer graph using the repeater exploration nodes.
test/repeater/nodes.hpp Adds provider/consumer and multi-argument consumer test nodes built around repeater nodes.
test/repeater/message_types.hpp Defines test message types and matchers used by the repeater graph.
test/repeater/index_router.hpp Declares index routing helper for broadcasting indices/flush tokens to multi-layer nodes.
test/repeater/index_router.cpp Implements routing/backout logic and multilayer broadcasting.
test/repeater/CMakeLists.txt Builds the repeater test and supporting index_router library.
test/CMakeLists.txt Enables building the new test/repeater subdirectory.
test/hierarchical_nodes.cpp Tightens execution count check for get_the_time.
test/cached_execution.cpp Tightens cached execution checks from >= to ==.
test/allowed_families.cpp Tightens provider execution count checks from >= to ==.
plugins/layer_generator.hpp Exposes layer_paths() accessor for test routing.
phlex/core/store_counters.hpp Adds detect_flush_flag::receive_flush(message const&) declaration to centralize flush handling.
phlex/core/store_counters.cpp Implements receive_flush() and fixes map insertion logic for store_flag.
phlex/core/multiplexer.cpp Stops routing flush messages through the multiplexer (now asserts non-flush only).
phlex/core/message_sender.hpp Switches flush delivery dependency from multiplexer to flusher_t.
phlex/core/message_sender.cpp Sends flush messages via flusher_t broadcast node instead of multiplexer.
phlex/core/fwd.hpp Adds message fwd-decl and introduces flusher_t alias.
phlex/core/framework_graph.hpp Adds a graph-wide flusher_ and wires message_sender to it.
phlex/core/framework_graph.cpp Connects graph-wide and unfold-local flushers to each node’s new flush_port().
phlex/core/declared_unfold.hpp Adds flush_port(), per-unfold flusher(), and tracks child_layer on unfolds.
phlex/core/declared_unfold.cpp Stores child_layer and uses parent-based flush store generation.
phlex/core/declared_transform.hpp Adds flush_port() receiver node; main transform path now asserts non-flush.
phlex/core/declared_provider.hpp Adds flush_port() receiver node; main provider path now asserts non-flush.
phlex/core/declared_predicate.hpp Adds flush_port() receiver node; main predicate path now asserts non-flush.
phlex/core/declared_observer.hpp Adds flush_port() receiver node; main observer path now asserts non-flush.
phlex/core/declared_fold.hpp Adds flush_port() receiver node and removes flush handling from the main fold path.

Comment on lines +143 to +152
if (entry->counter == 0) {
spdlog::debug("[{}/{}] ++ Outputting {} ({})",
node_name_,
layer_,
entry->msg_with_product->index->to_string(),
key);
output.try_put(*entry->msg_with_product);
++entry->counter;
}
assert(entry->counter == 1);
Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the !caching_required_ cleanup path, the code dereferences entry->msg_with_product without checking it is non-null. This can happen if the cache entry was created by queued index_message IDs before the product arrives, and then caching is disabled, leaving msg_with_product unset. This can crash the test. Guard against a null msg_with_product here (and consider draining/clearing any queued IDs when switching to non-caching mode).

Copilot uses AI. Check for mistakes.
knoepfel and others added 3 commits February 3, 2026 13:17
Minor adjustments

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant